{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Part I. ETL Pipeline for Pre-Processing the Files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Import Python packages "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Import Python packages \n",
    "import pandas as pd\n",
    "import cassandra\n",
    "import re\n",
    "import os\n",
    "import glob\n",
    "import numpy as np\n",
    "import json\n",
    "import csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Creating list of filepaths to process original event csv data files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "/home/workspace\n",
      "Total number of files:  30\n"
     ]
    }
   ],
   "source": [
    "# checking the current working directory\n",
    "print(os.getcwd())\n",
    "\n",
    "# Getting the current folder and subfolder of event data\n",
    "filepath = os.getcwd() + '/event_data'\n",
    "\n",
    "# Create a for loop to create a list of files and collect each filepath\n",
    "for root, dirs, files in os.walk(filepath):\n",
    "    \n",
    "# join the file path and roots with the subdirectories using glob\n",
    "    file_path_list = glob.glob(os.path.join(root,'*'))\n",
    "print(\"Total number of files: \", len(file_path_list))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Processing the files to create the data file csv that will be used for Apache Casssandra tables"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Initiating an empty list of rows that will be generated from each file\n",
    "full_data_rows_list = [] \n",
    "    \n",
    "# for every filepath in the file path list \n",
    "for f in file_path_list:\n",
    "\n",
    "# reading csv file \n",
    "    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: \n",
    "        # creating a csv reader object \n",
    "        csvreader = csv.reader(csvfile) \n",
    "        next(csvreader)    \n",
    " # extracting each data row one by one and append it        \n",
    "        for line in csvreader:\n",
    "            full_data_rows_list.append(line) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total number of rows:  8056\n"
     ]
    }
   ],
   "source": [
    "print(\"Total number of rows: \",len(full_data_rows_list))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# creating a smaller event data csv file called event_datafile_new csv that will be used to insert data into the \\\n",
    "# Apache Cassandra tables\n",
    "csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)\n",
    "with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:\n",
    "    writer = csv.writer(f, dialect='myDialect')\n",
    "    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\\\n",
    "                'level','location','sessionId','song','userId'])\n",
    "    \n",
    "    for row in full_data_rows_list:\n",
    "        if (row[0] == ''):\n",
    "            #skipping the row if artist column is empty\n",
    "            continue\n",
    "        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Total number of rows after cleaning the data: 6821\n"
     ]
    }
   ],
   "source": [
    "# check the number of rows in your csv file\n",
    "with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:\n",
    "    print(\"Total number of rows after cleaning the data:\",sum(1 for line in f))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Part II. Creating a denormalized table in Apache Cassandra"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The event_datafile_new.csv contains the following columns: \n",
    "- artist \n",
    "- firstName of user\n",
    "- gender of user\n",
    "- item number in session\n",
    "- last name of user\n",
    "- length of the song\n",
    "- level (paid or free song)\n",
    "- location of the user\n",
    "- sessionId\n",
    "- song title\n",
    "- userId\n",
    "\n",
    "The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>\n",
    "\n",
    "<img src=\"images/image_event_datafile_new.jpg\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Creating a Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This should make a connection to a Cassandra instance your local machine \n",
    "# (127.0.0.1)\n",
    "\n",
    "from cassandra.cluster import Cluster\n",
    "cluster = Cluster()\n",
    "\n",
    "# To establish connection and begin executing queries, need a session\n",
    "session = cluster.connect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Creating a Keyspace"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Creating a Keyspace, just like database(name)  in RDBMS\n",
    "try:\n",
    "    session.execute(\"\"\"\n",
    "    CREATE KEYSPACE IF NOT EXISTS udacity \n",
    "    WITH REPLICATION = \n",
    "    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }\"\"\"\n",
    ")\n",
    "\n",
    "except Exception as e:\n",
    "    print(e)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Set Keyspace"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Setting up KEYSPACE to the keyspace specified above\n",
    "try:\n",
    "    session.set_keyspace('udacity')\n",
    "except Exception as e:\n",
    "    print(e)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_table(query):\n",
    "    '''\n",
    "    This function is used to create table in KEYSPACE.\n",
    "    arg:\n",
    "        query(str): CREATE table query statement\n",
    "    return:\n",
    "        None \n",
    "    '''\n",
    "    try:\n",
    "        session.execute(query)\n",
    "    except Exception as e:\n",
    "        print(e)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "def insert_data(file, session, query, indicies, dtypes):\n",
    "    '''\n",
    "    Creating pipeline to insert data into a Apache Cassandra table from CSV file.\n",
    "    args:\n",
    "        file(str): CSV file name\n",
    "        session(object): Apache Cassandra session\n",
    "        query(str): INSERT query statement \n",
    "        indicies(list): list of indicies to pass to INSERT statement\n",
    "        dtypes(list): list of data types to convert indices values\n",
    "    \n",
    "    return: \n",
    "        None      \n",
    "    '''\n",
    "    #check both are of same size else stop executing the rest and throw an error\n",
    "    assert(len(indicies) == len(dtypes))\n",
    "    with open(file, encoding = 'utf8') as f:\n",
    "        csvreader = csv.reader(f)\n",
    "        next(csvreader) # skip header\n",
    "        for line in csvreader:\n",
    "            session.execute(query, tuple(dtype(line[x]) for x, dtype in zip(indicies, dtypes)))  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query-1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "## Creating Table for Query-1\n",
    "query = \"CREATE TABLE IF NOT EXISTS song_playlist_item_session\"\n",
    "query += '''( session_id int,\n",
    "              item_in_session int,\n",
    "              artist text, \n",
    "              length float,  \n",
    "              song text,  \n",
    "            PRIMARY KEY(session_id,item_in_session))'''\n",
    "## PRIMARY KEY: combining session_id and item_in_session partion key will make the row unique for the above table and \\\n",
    "## we used the two partion keys in the where class in the mentioned above order.\n",
    "create_table(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "# filename\n",
    "file = 'event_datafile_new.csv'\n",
    "\n",
    "# Query-1 indices and dtypes\n",
    "indicies = [8, 3, 0, 5, 9]\n",
    "dtypes = [int, int, str, float, str]\n",
    "\n",
    "# INSERT query statement\n",
    "query = \"INSERT INTO song_playlist_item_session(session_id, item_in_session, artist, length, song)\"\n",
    "query = query + \"VALUES(%s, %s, %s, %s,%s)\"\n",
    "\n",
    "# INSERT into the table\n",
    "insert_data(file, session, query, indicies, dtypes)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Verify Query-1 with the SELECT statement"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "## This query will fetch the artist, song title and length of the song \\\n",
    "## of session id 338 and item session 4\n",
    "query = \"\"\"SELECT artist, length, song\n",
    "           FROM song_playlist_item_session\n",
    "           WHERE session_id = 338 AND item_in_session = 4\"\"\"\n",
    "\n",
    "try:\n",
    "    rows = session.execute(query)\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "\n",
    "# collecting all rows in list as a tuple\n",
    "allRows = [] \n",
    "for row in rows:\n",
    "    allRows.append((row.artist, row.song,row.length))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>artist</th>\n",
       "      <th>song</th>\n",
       "      <th>length</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>Faithless</td>\n",
       "      <td>Music Matters (Mark Knight Dub)</td>\n",
       "      <td>495.307312</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "      artist                             song      length\n",
       "0  Faithless  Music Matters (Mark Knight Dub)  495.307312"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#creating dataFrame for the result query\n",
    "df = pd.DataFrame(allRows,columns=['artist','song','length'])\n",
    "# printing top 5 rows in the dataFrame\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query-2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "## Creating Table for Query-2\n",
    "query = \"CREATE TABLE IF NOT EXISTS song_playlist_session\"\n",
    "query += '''( user_id int,\n",
    "              session_id int, \n",
    "              item_in_session int,\n",
    "              artist text,\n",
    "              first_name text,\n",
    "              last_name text,\n",
    "              song text,\n",
    "           PRIMARY KEY((user_id, session_id), item_in_session))'''\n",
    "create_table(query)            "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Query-2 indices and dtypes\n",
    "indicies = [10, 8, 3, 0, 1, 4, 9]\n",
    "dtypes = [ int, int, int, str, str,str, str]\n",
    "\n",
    "# INSERT query statement\n",
    "query = \"INSERT INTO song_playlist_session(user_id, session_id, item_in_session, artist, first_name,last_name, song)\"\n",
    "query = query + \"VALUES(%s, %s, %s, %s,%s, %s,%s)\"\n",
    "\n",
    "# INSERT into the table\n",
    "insert_data(file, session, query, indicies, dtypes)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Verify Query-2 with the SELECT statement"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "## This query will fetch name of artist, song (sorted by itemInSession) and user (first and last name)\\\n",
    "## for userid = 10, sessionid = 182\n",
    "query = \"\"\" SELECT artist, song, first_name, last_name\n",
    "           FROM song_playlist_session\n",
    "           WHERE user_id = 10 AND session_id = 182\n",
    "           order by item_in_session DESC \"\"\"\n",
    "\n",
    "try:\n",
    "    rows = session.execute(query)\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "\n",
    "# collecting all rows in list as a tuple\n",
    "allRows = [] \n",
    "for row in rows:\n",
    "    allRows.append((row.artist, row.song,row.first_name, row.last_name))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>artist</th>\n",
       "      <th>song</th>\n",
       "      <th>first_name</th>\n",
       "      <th>last_name</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>Lonnie Gordon</td>\n",
       "      <td>Catch You Baby (Steve Pitron &amp; Max Sanna Radio...</td>\n",
       "      <td>Sylvie</td>\n",
       "      <td>Cruz</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>Sebastien Tellier</td>\n",
       "      <td>Kilometer</td>\n",
       "      <td>Sylvie</td>\n",
       "      <td>Cruz</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>Three Drives</td>\n",
       "      <td>Greece 2000</td>\n",
       "      <td>Sylvie</td>\n",
       "      <td>Cruz</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>Down To The Bone</td>\n",
       "      <td>Keep On Keepin' On</td>\n",
       "      <td>Sylvie</td>\n",
       "      <td>Cruz</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "              artist                                               song  \\\n",
       "0      Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...   \n",
       "1  Sebastien Tellier                                          Kilometer   \n",
       "2       Three Drives                                        Greece 2000   \n",
       "3   Down To The Bone                                 Keep On Keepin' On   \n",
       "\n",
       "  first_name last_name  \n",
       "0     Sylvie      Cruz  \n",
       "1     Sylvie      Cruz  \n",
       "2     Sylvie      Cruz  \n",
       "3     Sylvie      Cruz  "
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#creating dataFrame for the query result.\n",
    "df = pd.DataFrame(allRows,columns=['artist','song','first_name','last_name'])\n",
    "# printing top 5 rows in the dataFrame\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Query-3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "## Creating Table for Query-3\n",
    "query = \"CREATE TABLE IF NOT EXISTS song_playlist_library\"\n",
    "query += '''( song text,\n",
    "              user_id int,\n",
    "              first_name text,\n",
    "              last_name text,\n",
    "           PRIMARY KEY(song, user_id))'''\n",
    "## PRIMARY KEY: combining song,  session_id and item_in_session partion keys will make the row unique for the above table and \\\n",
    "## used song in where clause to fetch the all the rows based on song title\n",
    "\n",
    "create_table(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Query-3 indices and dtypes\n",
    "indicies = [9, 10, 1, 4]\n",
    "dtypes = [str, int, str, str]\n",
    "\n",
    "# INSERT query statement\n",
    "query = \"INSERT INTO song_playlist_library(song, user_id, first_name, last_name)\"\n",
    "query = query + \"VALUES(%s,%s, %s, %s)\"\n",
    "\n",
    "# INSERT into the table\n",
    "insert_data(file, session, query, indicies, dtypes)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Verify Query-3 with the SELECT statement"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "## This query will fetch me user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'\n",
    "query = \"\"\" SELECT first_name, last_name\n",
    "           FROM song_playlist_library\n",
    "           WHERE song = 'All Hands Against His Own' \"\"\"\n",
    "\n",
    "try:\n",
    "    rows = session.execute(query)\n",
    "except Exception as e:\n",
    "    print(e)\n",
    "    \n",
    "# collecting all rows in list as a tuple\n",
    "allRows = []    \n",
    "for row in rows:\n",
    "    allRows.append((row.first_name, row.last_name))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>first_name</th>\n",
       "      <th>last_name</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>Jacqueline</td>\n",
       "      <td>Lynch</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>Tegan</td>\n",
       "      <td>Levine</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>Sara</td>\n",
       "      <td>Johnson</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   first_name last_name\n",
       "0  Jacqueline     Lynch\n",
       "1       Tegan    Levine\n",
       "2        Sara   Johnson"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#creating dataFrame for the result query\n",
    "df = pd.DataFrame(allRows,columns=['first_name','last_name'])\n",
    "# printing top 5 rows in the dataFrame\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Drop the tables before closing out the sessions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "def drop_table(table_names):\n",
    "    \n",
    "    '''\n",
    "    This function helps in drop the tables in the KEYSAPCE.\n",
    "    arg:\n",
    "        table_names(list): list of table names to drop in the KEYSPACE\n",
    "    return:\n",
    "        None\n",
    "    '''\n",
    "    for table in table_names:\n",
    "        query = \"DROP TABLE \"+table\n",
    "        try:\n",
    "            session.execute(query)\n",
    "        except Exception as e:\n",
    "            print(e)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "# List of table names to drop\n",
    "table_names = ['song_playlist_item_session', 'song_playlist_session', 'song_playlist_library']\n",
    "\n",
    "# Drop tables\n",
    "drop_table(table_names)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Close the session and cluster connection¶"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.shutdown()\n",
    "cluster.shutdown()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
